package com.atguigu.utils;

import com.alibaba.fastjson.JSONObject;
import com.atguigu.common.Constant;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.kafka.clients.producer.ProducerRecord;

import javax.annotation.Nullable;
import java.io.IOException;

public class KafkaUtil {

    public static KafkaSource<String> getKafkaSource(String topic, String groupId) {
        return KafkaSource.<String>builder()
                .setBootstrapServers(Constant.KAFKA_SERVERS)
                .setTopics(topic)
                .setGroupId(groupId)
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new DeserializationSchema<String>() {
                    @Override
                    public String deserialize(byte[] message) throws IOException {
                        if (message == null) {
                            return "";
                        } else {
                            return new String(message);
                        }
                    }

                    @Override
                    public boolean isEndOfStream(String nextElement) {
                        return false;
                    }

                    @Override
                    public TypeInformation<String> getProducedType() {
                        return BasicTypeInfo.STRING_TYPE_INFO;
                    }
                })
                .build();
    }

    public static KafkaSink<String> getKafkaSink(String topic) {
        return KafkaSink.<String>builder()
                .setBootstrapServers(Constant.KAFKA_SERVERS)
                .setRecordSerializer(new KafkaRecordSerializationSchema<String>() {
                    @Nullable
                    @Override
                    public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {
                        return new ProducerRecord<>(topic, element.getBytes());
                    }
                })
                .build();
    }

    public static KafkaSink<JSONObject> getKafkaSink() {
        return KafkaSink.<JSONObject>builder()
                .setBootstrapServers(Constant.KAFKA_SERVERS)
                .setRecordSerializer(new KafkaRecordSerializationSchema<JSONObject>() {
                    @Nullable
                    @Override
                    public ProducerRecord<byte[], byte[]> serialize(JSONObject element, KafkaSinkContext context, Long timestamp) {
                        return new ProducerRecord<>(element.getString("topic"),
                                element.getString("data").getBytes());
                    }
                })
                .build();
    }

    public static <T> KafkaSink<T> getKafkaSink(KafkaRecordSerializationSchema<T> kafkaRecordSerializationSchema) {
        return KafkaSink.<T>builder()
                .setBootstrapServers(Constant.KAFKA_SERVERS)
                .setRecordSerializer(kafkaRecordSerializationSchema)
                .build();
    }

    public static String getKafkaSourceDDL(String topic, String groupId) {
        return " with (\n" +
                "    'connector' = 'kafka',\n" +
                "    'topic' = '" + topic + "',\n" +
                "    'properties.bootstrap.servers' = '" + Constant.KAFKA_SERVERS + "',\n" +
                "    'properties.group.id' = '" + groupId + "',\n" +
                "    'scan.startup.mode' = 'latest-offset',\n" +
                "    'format' = 'json'\n" +
                ")";
    }

    public static String getTopicDbDDL(String groupId) {
        return "create table topic_db(\n" +
                "    `database` string,\n" +
                "    `table` string,\n" +
                "    `type` string,\n" +
                "    `ts` bigint,\n" +
                "    `data` map<string,string>,\n" +
                "    `old` map<string,string>,\n" +
                "    `pt` as PROCTIME(),\n" +
                "    `rt` as TO_TIMESTAMP_LTZ(`ts`, 0),\n" +
                "    WATERMARK FOR `rt` AS `rt` - INTERVAL '2' SECOND\n" +
                ")" + getKafkaSourceDDL(Constant.TOPIC_ODS_DB, groupId);
    }

    public static String getKafkaSinkDDL(String topic) {
        return "with (\n" +
                "    'connector' = 'kafka',\n" +
                "    'topic' = '" + topic + "',\n" +
                "    'properties.bootstrap.servers' = '" + Constant.KAFKA_SERVERS + "',\n" +
                "    'format' = 'json'\n" +
                ")";
    }

    public static String getUpsertKafkaSinkDDL(String topic) {
        return " with (\n" +
                "    'connector' = 'upsert-kafka',\n" +
                "    'topic' = '" + topic + "',\n" +
                "    'properties.bootstrap.servers' = '" + Constant.KAFKA_SERVERS + "',\n" +
                "    'key.format' = 'json',\n" +
                "    'value.format' = 'json'\n" +
                ")";
    }

}
